package com.nx.platform.es.biz.esspider.reactor;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.nx.platform.es.biz.esspider.deliver.Deliverer;
import com.nx.platform.es.biz.esspider.deliver.DelivererImpl;
import com.nx.platform.es.biz.esspider.recycle.Recycler;
import com.nx.platform.es.biz.esspider.resource.DBManager;
import com.nx.platform.es.biz.esspider.resource.DBManagerImpl;
import com.nx.platform.es.biz.esspider.sink.Sinks;
import com.nx.platform.es.biz.esspider.source.FileSourceService;
import com.nx.platform.es.biz.esspider.source.RecycleSourceService;
import com.nx.platform.es.common.utils.YamlParser;
import com.nx.platform.es.biz.esspider.handler.Handlers;
import com.nx.platform.es.biz.esspider.handler.HandlersImpl;
import com.nx.platform.es.biz.esspider.process.ProcessorServiceImpl;
import com.nx.platform.es.biz.esspider.recycle.RecyclerImpl;
import com.nx.platform.es.biz.esspider.sink.SinksImpl;
import com.nx.platform.es.common.utils.Constants;
import com.nx.platform.es.common.utils.MoreMaps;
import com.nx.platform.es.service.ESClientManager;
import com.nx.platform.es.service.impl.ESClientManagerImpl;
import com.nx.platform.es.system.config.ConfigCenter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;


/**
 * 类似Notify
 * @date 2018/04/13
 */
public class ImportReactor extends Reactor {
    private static final Logger LOGGER = LogManager.getLogger(ImportReactor.class);
    private final String configFilePath;
    private final String dataFilePath;
    private final String dbConfigKey;
    private final String esConfigKey;
    private final String handlerConfigKey;
    private final String sinkConfigKey;
    private final String processorConfigKey;
    private final String sourceFiles;
    private final String targetSinks;
    private final String debugSwitchConfigKey;

    private ImportReactor(
            String configFilePath, String dataFilePath, String configKeyPrefix,
            String dbConfigKey, String esConfigKey,
            String handlerConfigKey, String sinkConfigKey, String processorConfigKey,
            String sourceFiles, String targetSinks, String debugSwitchConfigKey) {
        this.configFilePath = configFilePath;
        this.dataFilePath = dataFilePath;
        this.dbConfigKey = configKeyPrefix + "." + dbConfigKey;
        this.processorConfigKey = configKeyPrefix + "." + processorConfigKey;
        this.esConfigKey = configKeyPrefix + "." + esConfigKey;
        this.handlerConfigKey = configKeyPrefix + "." + handlerConfigKey;
        this.sinkConfigKey = configKeyPrefix + "." + sinkConfigKey;
        this.debugSwitchConfigKey = configKeyPrefix + "." + debugSwitchConfigKey;
        this.sourceFiles = sourceFiles;
        this.targetSinks = targetSinks;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    protected void init() {

        // resources
        DBManager dbManager = new DBManagerImpl(configFilePath, dbConfigKey);
        ESClientManager esManager = new ESClientManagerImpl();

        // handlers, sinks, deliverer, recycler
        Handlers handlers = new HandlersImpl(handlerConfigKey, dbManager);
        Sinks sinks = new SinksImpl(sinkConfigKey, esManager, targetSinks);
        Deliverer deliverer = new DelivererImpl();
        Recycler recycler = new RecyclerImpl(dataFilePath);

        // sources
        addSource(new FileSourceService(deliverer, sourceFiles));
        boolean recycleFlag = true;
        try{
            recycleFlag = Boolean.parseBoolean(System.getProperty("recycleFlag"));
        }catch(Exception e) {

        }
        LOGGER.info(" >>>> recycleFlag={}", recycleFlag);
        if(recycleFlag) {
            addSource(new RecycleSourceService(deliverer, recycler));
        }

        // processors
        List<Map<?, ?>> processors = YamlParser.parseToList(ConfigCenter.getConfig(processorConfigKey).orElse(""));
        LOGGER.info(" >>>>>> processors ={}", new Gson().toJson(processors));

        processors.stream().map(map -> Pair.of(MoreMaps.getString(map, "code"), map))
                .map(p -> new ProcessorServiceImpl(p.getLeft(), p.getRight(), handlers, sinks, recycler, debugSwitchConfigKey))
                .peek(deliverer::register).forEach(this::addProcessor);

    }

    @Override
    protected void run() throws Exception {
        FileSourceService source = getSource(FileSourceService.class);
        assert source != null;
        source.awaitTerminated();
    }

    public static class Builder {

        private String configFilePath;
        private String dataFilePath;
        private String configNamespace;
        private String sourceFiles;
        private String targetSinks;

        private Builder() {}

        public ImportReactor build() {
            Preconditions.checkArgument(StringUtils.isNotBlank(configFilePath));
            Preconditions.checkArgument(StringUtils.isNotBlank(dataFilePath));
            Preconditions.checkArgument(StringUtils.isNotBlank(configNamespace));
            Preconditions.checkArgument(StringUtils.isNotBlank(sourceFiles));
            Preconditions.checkArgument(StringUtils.isNotBlank(targetSinks));
            return new ImportReactor(
                    configFilePath, dataFilePath, configNamespace,
                    Constants.KEY_DB, Constants.ES_CLIENT,
                    Constants.KEY_HANDLERS, Constants.KEY_SINKS, Constants.KEY_PROCESSORS,
                    sourceFiles, targetSinks, Constants.KEY_DEBUG);
        }

        public Builder configFilePath(String configFilePath) {
            this.configFilePath = configFilePath;
            return this;
        }

        public Builder dataFilePath(String dataFilePath) {
            this.dataFilePath = dataFilePath;
            return this;
        }

        public Builder configNamespace(String configNamespace) {
            this.configNamespace = configNamespace;
            return this;
        }

        public Builder sourceFiles(String sourceFiles) {
            this.sourceFiles = sourceFiles;
            return this;
        }

        public Builder targetSinks(String targetSinks) {
            this.targetSinks = targetSinks;
            return this;
        }

    }

}
